package com.remoter.registry.zookeeper;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
import org.apache.curator.framework.recipes.cache.TreeCache;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
import org.apache.curator.framework.recipes.cache.TreeCacheEvent.Type;
import org.apache.curator.framework.recipes.cache.TreeCacheListener;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;

import com.remoter.api.configure.IConfiguration;
import com.remoter.api.configure.support.AbstractConfiguration;
import com.remoter.api.context.support.Exporter;
import com.remoter.api.context.support.Importer;
import com.remoter.api.extension.annotation.ExtensionName;
import com.remoter.api.extension.support.ExtensionLoader;
import com.remoter.api.registry.IRegistryListener;
import com.remoter.api.registry.support.AbstractRegistry;
import com.remoter.api.serialize.ISerialization;
import com.remoter.api.util.Final;
import com.remoter.api.util.StringUtil;
import com.remoter.registry.zookeeper.util.FinalRegistryZookeeper;

@ExtensionName("zookeeper")
public class ZookeeperRegistry extends AbstractRegistry{
	
	private final IConfiguration configuration;
	private final ISerialization serialization;
	private CuratorFramework curatorFramework;
	private TreeCache treeCache = null;
	
	public ZookeeperRegistry(){
		this.configuration = AbstractConfiguration.getConfiguration();
		this.serialization = ExtensionLoader.getService(ISerialization.class,this.configuration.getOption(Final.O_SERVER_SERIALIZE));
		this.start();
	}
	
	@Override
	public boolean attachExporter(Exporter exporter){
		if(null == exporter){
			return false;
		}
		try(ByteArrayOutputStream baos = new ByteArrayOutputStream()){
			String path = exporter.getPath();
			if(this.curatorFramework.checkExists().forPath(path) != null){
				this.curatorFramework.delete().forPath(path);
			}
			this.serialization.serialize(exporter,baos);
			byte[] data = baos.toByteArray();
			this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,data);
			return true;
		}catch(Exception e){
			logger.warn(e.getMessage(),e);
		}
		return false;
	}

	@Override
	public boolean attachImporter(Importer importer){
		if(null == importer){
			return false;
		}
		try(ByteArrayOutputStream baos = new ByteArrayOutputStream()){
			String path = importer.getPath();
			if(this.curatorFramework.checkExists().forPath(path) != null){
				this.curatorFramework.delete().forPath(path);
			}
			this.serialization.serialize(importer,baos);
			byte[] data = baos.toByteArray();
			this.curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,data);
			return true;
		}catch(Exception e){
			logger.warn(e.getMessage(),e);
		}
		return false;
	}

	@Override
	public boolean detachExporter(Exporter exporter){
		if(null == exporter){
			return false;
		}
		try{
			String path = exporter.getPath();
			if(null != this.curatorFramework.checkExists().forPath(path)){
				this.curatorFramework.delete().forPath(path);
				return true;
			}
			return false;
		}catch(Exception e){
			logger.warn(e.getMessage(),e);
		}
		return false;
	}

	@Override
	public boolean detachImporter(Importer importer){
		if(null == importer){
			return false;
		}
		try{
			String path = importer.getPath();
			if(null != this.curatorFramework.checkExists().forPath(path)){
				this.curatorFramework.delete().forPath(path);
				return true;
			}
			return false;
		}catch(Exception e){
			logger.warn(e.getMessage(),e);
		}
		return false;
	}

	@Override
	public List<Exporter> exporters(){
		List<Exporter> exporters = new ArrayList<Exporter>();
		this.list(Exporter.class,"/",exporters,Match.EXPORTER_MATCH);
		return exporters;
	}

	@Override
	public List<Importer> importers(){
		List<Importer> importers = new ArrayList<Importer>();
		this.list(Importer.class,"/",importers,Match.IMPORTER_MATCH);
		return importers;
	}
	
	@SuppressWarnings("unchecked")
	private <T> void list(Class<T> clazz,String path,List<T> list,Match match){
		try{
			if(null != this.curatorFramework.checkExists().forPath(path)){
				List<String> paths = this.curatorFramework.getChildren().forPath(path);
				if(null != paths && paths.size() > 0){
					for(String p : paths){
						if(!p.startsWith("/")){
							p = "/" + p;
						}
						if(path.endsWith("/")){
							path = path.substring(0,path.length()-1);
						}
						p = path + p;
						if(match.matching(p)){
							byte[] byteData = this.curatorFramework.getData().forPath(p);
							if(null != byteData && byteData.length > 0){
								try(ByteArrayInputStream bais = new ByteArrayInputStream(byteData);){
									Object obj = this.serialization.deserialize(bais);
									list.add((T)obj);
								}
							}
						}
						this.list(clazz,p,list,match);
					}
				}
			}
		}catch(Exception e){
			logger.warn(e.getMessage(),e);
		}
	}
	
	@Override
	public void start(){
		Builder builder = CuratorFrameworkFactory.builder();
		builder.retryPolicy(new ExponentialBackoffRetry(this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_RETRY_TIME_SLEEP),this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_RETRY_TIME_COUNT)));
		builder.connectionTimeoutMs(this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_CONNECTION_TIMEOUT));
		builder.connectString(this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_CONNECTION_ADDRESS));
		builder.namespace(this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_NAMESPACE));
		builder.sessionTimeoutMs(this.configuration.getOption(FinalRegistryZookeeper.O_REGISTRY_SESSION_TIMEOUT));
		builder.defaultData(null);
		this.curatorFramework = builder.build();
		this.curatorFramework.getConnectionStateListenable().addListener(new ConnectionStateListener(){
			@Override
			public void stateChanged(CuratorFramework client,ConnectionState newState){
				if(newState == ConnectionState.LOST){
					logger.warn("zookeeper is lost ,try connect");
					ZookeeperRegistry.this.start();
				}
			}
		});
		this.curatorFramework.start();
		this.watching("/");
	}

	@Override
	public void close(){
		if(null != this.treeCache){
			this.treeCache.close();
		}
		if(null != this.curatorFramework){
			this.curatorFramework.close();
		}
	}
	
	private void watching(String path){
		try{
			treeCache = new TreeCache(this.curatorFramework,path);
			treeCache.getListenable().addListener(this.childListener);
			treeCache.start();
		}catch(Exception e){
			e.printStackTrace();
			if(null != treeCache){
				treeCache.close();
			}
		}
	}
	
	private static interface Match{
		public static final Match EXPORTER_MATCH = new ExporterMatch();
		public static final Match IMPORTER_MATCH = new ImporterMatch();
		public boolean matching(String path);
	}
	
	private static class ExporterMatch implements Match{
		@Override
		public boolean matching(String path) {
			if(StringUtil.isBlank(path)){
				return false;
			}
			if(path.indexOf("/providers/")>0){
				return true;
			}else{
				return false;
			}
		}
	}
	
	private static class ImporterMatch implements Match{
		@Override
		public boolean matching(String path) {
			if(StringUtil.isBlank(path)){
				return false;
			}
			if(path.indexOf("/consumers/")>0){
				return true;
			}else{
				return false;
			}
		}
	}
	
	private TreeCacheListener childListener = new TreeCacheListener(){
		@Override
		public void childEvent(CuratorFramework client,TreeCacheEvent event) throws Exception {
			if(null == event || null == event.getData() || null == event.getData().getPath() || null == event.getData().getData()){
				return;
			}
			String path = event.getData().getPath();
			byte[] data = event.getData().getData();
			if(null == data || data.length == 0){
				return;
			}
			List<IRegistryListener> urlListeners = ZookeeperRegistry.this.listeners;
			if(null == urlListeners || urlListeners.size() == 0){
				return;
			}
			try(ByteArrayInputStream bais = new ByteArrayInputStream(data)){
				Object result = serialization.deserialize(bais);
				for(IRegistryListener listener : urlListeners){
					if(event.getType() == Type.NODE_ADDED){
						listener.onCreate(path,result);
					}else if(event.getType() == Type.NODE_REMOVED){
						listener.onDelete(path,result);
					}else if(event.getType() == Type.NODE_UPDATED){
						listener.onUpdate(path,result);
					}
				}
			}
		}
	};
	
}